4e9b3a55f781cde7760c71842b2884a6eaf75819,src/org/jgroups/protocols/pbcast/NAKACK2.java,NAKACK2,removeAndPassUp,#Table#Address#boolean#AsciiString#,879

Before Change


        boolean remove_msgs=discard_delivered_msgs && !loopback;
        MessageBatch batch=new MessageBatch(max_msg_batch_size).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
        Supplier<MessageBatch> batch_creator=() -> batch;
        while(true) {
            batch.reset();
            // We're removing as many msgs as possible and set processing to false (if null) *atomically* (wrt to add())
            // Don't include DUMMY and OOB_DELIVERED messages in the removed set
            buf.removeMany(processing, remove_msgs, max_msg_batch_size,
                           no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
                           batch_creator, BATCH_ACCUMULATOR, BATCH_VALIDATOR);
            if(batch.isEmpty()) {
                if(rebroadcasting)
                    checkForRebroadcasts();
                return;
            }
            deliverBatch(batch);
        }

    }

After Change


     *  Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
     */
    protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name) {
        AtomicInteger adders=buf.getAdders();
        if(adders.getAndIncrement() != 0)
            return;
        boolean remove_msgs=discard_delivered_msgs && !loopback;
        MessageBatch batch=new MessageBatch(buf.size()).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
        Supplier<MessageBatch> batch_creator=() -> batch;
        do {
            try {
                batch.reset();
                // Don't include DUMMY and OOB_DELIVERED messages in the removed set
                buf.removeMany(remove_msgs, 0, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
                               batch_creator, BATCH_ACCUMULATOR);
            }
            catch(Throwable t) {
                log.error("failed removing messages from table for " + sender, t);
            }
            if(!batch.isEmpty())
                deliverBatch(batch);
        }
        while(adders.decrementAndGet() != 0);
        if(rebroadcasting)
            checkForRebroadcasts();
    }